package org.example.state;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.example.data.WaterSensor;
import org.example.function.WaterSensorMapFunction;

import java.time.Duration;

/**
 * 状态后端配置
 */
public class StateBackendDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //指定状态后端
        //1。使用HashMap状态后端
        HashMapStateBackend hashMapStateBackend = new HashMapStateBackend();
        env.setStateBackend(hashMapStateBackend);
        //2。使用RocksDB的状态后端
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        env.setStateBackend(embeddedRocksDBStateBackend);


        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<WaterSensor> sensorDS = source.map(new WaterSensorMapFunction());

        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
                //指定乱序的Watermark生成，延时。
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((WaterSensor waterSensor, long l) -> {
                    //返回的时间戳 毫秒
                    return waterSensor.getTs() * 1000;
                });

        SingleOutputStreamOperator<String> process = sensorDS.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {
            //可以定义多个状态
            ValueState<Integer> lastVcValue;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //创建state ttl
                StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        .build();

                ValueStateDescriptor<Integer> valueStateDescriptor = new ValueStateDescriptor<>("lastIdValue", Types.INT);
                //设置状态时间
                valueStateDescriptor.enableTimeToLive(ttlConfig);
                lastVcValue = getRuntimeContext().getState(valueStateDescriptor);
            }

            //reduce 输入类型，输出类型，中间类型要一样
            @Override
            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                Integer lastVc = lastVcValue.value();
                out.collect("传感器：" + ctx.getCurrentKey() + " 状态值：" + lastVc);
                lastVcValue.update(value.getVc());
            }
        });
        process.printToErr("");
        env.execute("StateTtlDemo");
    }
}
